使用Amazon SageMaker 训练因子分解机模型并应用于推荐系统。
4. 将以下代码复制到笔记本中的代码单元格中,为了demo的录制和运算的效率,我们对原始数据集进行采样,最后选择了10w条数据。
# 导入原始数据集
import pandas as pd
df_all = pd.read_csv("ratings.csv")
print(df_all.shape)
print(df_all['userId'].nunique())
print(df_all['movieId'].nunique())
# 随机采样
df = df_all.sample(n=100000)
df.drop(['timestamp'],axis=1,inplace=True)
df.to_csv("ratings_100k.csv",index=False)
user_number = df['userId'].nunique()
movie_number = df['movieId'].nunique()
print(df.shape)
print(user_number, movie_number)
5. 接下来进行训练集和测试集的划分,在这里我们采取4:1的比例,调用sklearn的train_test_split进行划分。
# 划分训练集和测试集
from sklearn.model_selection import train_test_split
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)
df_train.to_csv('100k_train.csv', index=False)
df_test.to_csv('100k_test.csv', index=False)
print(df_train.shape, df_test.shape)
print(df_train.head(5))
print(df_test.head(5))
6. 数据集中的用户ID和电影ID均为随机字符串,为了方便我们后续建立有序矩阵以及模型训练后预测结果的数据对应,我们首先为用户和电影分别建立由0开始的index序列,并使其与ID字符串对应。
# 分别建立用户和电影字典
import csv
def createIDtoIndexDict(filename, user_number, movie_number):
u_Dict = {}
m_Dict = {}
i = 0
m = 0
with open(filename, 'r') as f:
sample = csv.reader(f, delimiter=',')
for userId, movieId, rating in sample:
if userId == 'userId':
continue
else:
if userId not in u_Dict.keys():
u_Dict[userId] = i
i = i+1
if movieId not in m_Dict.keys():
m_Dict[movieId] = m
m = m+1
return u_Dict, m_Dict
u_Dict, m_Dict = createIDtoIndexDict('ratings_100k.csv', user_number, movie_number)
# print(u_Dict, m_Dict)
7. 因子分解机的训练是针对稀疏矩阵的,因此我们要将数据集中电影、评分、用户的序列转为稀疏矩阵,并根据用户评分的结果生成标签向量。我们使用Python Scipy模块中的lil_matrix来构建。生成的矩阵应当是每一个用户ID作为单独一列、每一部电影在所有用户列之后也作为单独一列,针对原数据集中每行的数据,在对应的用户列和电影列置1。标签向量以用户评分为基准,我们设定用户评分大于等于6的为“喜爱”,并在对应的标签向量位置置1,反之为“不喜爱”,标签向量相应位置置0。
训练集和测试集均进行同样的操作。
# 建立稀疏矩阵(Sparse Matrix)和标签向量(Label Vector)
import scipy
import numpy as np
def loadDataset(filename, lines, columns):
X = scipy.sparse.lil_matrix((lines, columns)).astype('float32')
Y = []
line = 0
with open (filename, 'r') as f:
sample = csv.reader(f, delimiter=',')
for userId, movieId, rating in sample:
if userId == 'userId':
continue
else:
X[line, u_Dict[userId]] = 1
X[line, user_number + m_Dict[movieId]] = 1
if rating == 'rating' or int(float(rating)) < 4:
Y.append(0)
else:
Y.append(1)
line=line+1
Y=np.array(Y).astype('float32')
return X, Y
columns = user_number + movie_number
X_train, Y_train = loadDataset('100k_train.csv', df_train.shape[0], columns)
X_test, Y_test = loadDataset('100k_test.csv', df_test.shape[0], columns)
print(X_train.shape, X_test.shape)
print(Y_train.shape, Y_test.shape)
8. 稀疏矩阵中绝大多数元素均为0,如果直接保存稀疏矩阵,会占用大量的存储空间,因此我们将其转为protobuf格式的数据,并保存到S3。
# 转换稀疏矩阵为protobuf格式,并保存到S3
import io,boto3
import sagemaker.amazon.common as smac
bucket = 'sagemaker-demo-movie-recommendation'
prefix = 'sagemaker/movie-recommendation'
train_key = 'train.protobuf'
train_prefix = '{}/{}'.format(prefix, 'train')
test_key = 'test.protobuf'
test_prefix = '{}/{}'.format(prefix, 'test')
output_prefix = 's3://{}/{}/output'.format(bucket, prefix)
def writeDatasetToProtobuf(X, Y, bucket, prefix, key):
buf = io.BytesIO()
smac.write_spmatrix_to_sparse_tensor(buf, X, Y)
buf.seek(0)
print(buf)
obj = '{}/{}'.format(prefix, key)
boto3.resource('s3').Bucket(bucket).Object(obj).upload_fileobj(buf)
print('Wrote dataset: {}/{}'.format(bucket,obj))
return 's3://{}/{}'.format(bucket,obj)
train_data = writeDatasetToProtobuf(X_train, Y_train, bucket, train_prefix, train_key)
test_data = writeDatasetToProtobuf(X_test, Y_test, bucket, test_prefix, test_key)
print(train_data)
print(test_data)
print('Output: {}'.format(output_prefix))
1. FM是Amazon SageMaker自带的算法之一,因此通过Amazon SageMaker训练模型非常容易。首先我们需要引入SageMaker的SDK,并建立Amazon SageMaker的session、定义位于该Region的因子分解机算法Container以及获取SageMaker的运行角色。随后我们定义FM训练需要的一些参数。首先是环境参数,包括之前定义好的Container、角色、输出位置和session、还包括训练使用的EC2实例,本例中采用“ml.c4.xlarge”来训练。
之后,我们需要定义FM算法的超参(Hyperparameters)。在本例中特征列为用户数与电影数的总和、预测方式为二分类(即结果为判断“喜爱”或是“不喜爱”)、最小批量为1000、epoch时期为50次。其中num_factors即为在算法介绍中提到的潜藏特征K的数量,根据Amazon SageMaker官方文档的说明,建议在2-1000之间,通常64为最优值,因此,我们也设为64。
最后为模型提供训练集和测试集在S3中的位置,训练就开始了。
# FM模型训练
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker
sess = sagemaker.Session()
role = get_execution_role()
container = get_image_uri(boto3.Session().region_name, 'factorization-machines')
fm = sagemaker.estimator.Estimator(container,
role,
train_instance_count=1,
train_instance_type='ml.c4.xlarge',
output_path=output_prefix,
sagemaker_session=sess)
fm.set_hyperparameters(feature_dim=68035,
predictor_type='binary_classifier',
mini_batch_size=1000,
num_factors=64,
epochs=50)
fm.fit({'train': train_data, 'test':test_data})
1. 最后我们进行一个简单的预测,从参与训练的电影库中推荐10部电影给某个已存在的用户。
首先生成该用于的稀疏矩阵。
# 预测:从参与训练的电影库(26004部)中推荐10部给已存在的某个用户;
# 生成该用户的稀疏矩阵
def createInferenceForUser(userId, lines, columns):
X = scipy.sparse.lil_matrix((lines, columns)).astype('float32')
line = 0
for line in range(0, lines):
X[line, u_Dict[userId]] = 1
X[line, user_number + line] = 1
line=line+1
print(X.shape)
return X
# 随机选择一个用户ID为10965
user_inference = createInferenceForUser('10965', movie_number, user_number + movie_number)
print(user_inference.shape)
2.然后调用部署好的终端节点进行预测,对预测的结果按照score进行排序选择排名前10的电影。
import json
from sagemaker.predictor import json_deserializer
#invoke endpoint
client = boto3.client('sagemaker-runtime')
def fm_serializer(data):
js = {'instances': []}
for row in data:
js['instances'].append({'features': row.tolist()})
return json.dumps(js)
# 把电影dict转为dataframe
df_m_Dict = pd.DataFrame.from_dict(m_Dict, orient='index',columns=['movie_index'])
df_m_Dict = df_m_Dict.reset_index().rename(columns = {'index':'movieId'})
inf_start = 0
step = 10
response = []
while inf_start <= user_inference.shape[0]+1:
raw_response = client.invoke_endpoint(
EndpointName = 'factorization-machines-2020-04-13-07-19-57-191',
Body = fm_serializer(user_inference[inf_start:inf_start+step].toarray()),
ContentType='application/json')
result = json.loads(raw_response['Body'].read())
for counter, p in enumerate(result['predictions']):
p['movieId'] = df_m_Dict[df_m_Dict.movie_index == (inf_start + int(counter))].movieId.values[0]
response.append(p)
inf_start = inf_start + step
print(len(response))
# 将response list转为dataframe
df_response = pd.DataFrame(response, columns=['score', 'predicted_label', 'movieId'])
df_response_like = df_response[df_response.predicted_label == 1.0]
# 按照score选择前十个电影推荐
final = df_response_like.sort_values('score', ascending=False).head(10)
print(final)